适用于版本0.10.1。
1 配置
- 支持Spark 2.4.3+版本。
- 0.9.0已添加Spark DML支持,但处于实验中。
- 对于Spark 3支持情况
Hudi | Supported Spark 3 version |
---|---|
0.10.0 | 3.1.x (default build), 3.0.x |
0.7.0 - 0.9.0 | 3.0.x |
0.6.0 and prior | not supported |
- Spark SQL
使用HoodieSparkSessionExtension支持数据读写
需要注意添加spark-avro支持和保持版本一致
1 | # Spark SQL for spark 3.1 |
Spark DataSource
数据生成器详见DataGenerator
1
2
3
4
5
6
7
8
9
10
11// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
2 Spark SQL类型支持
Spark | Hudi | Notes |
---|---|---|
boolean | boolean | |
byte | int | * |
short | int | * |
integer | int | |
long | long | |
date | date | |
timestamp | timestamp | |
float | float | |
double | double | |
string | string | |
decimal | decimal | |
binary | bytes | * |
array | array | |
map | map | |
struct | struct | |
char | not supported | |
varchar | not supported | |
numeric | not supported | |
null | not supported | |
object | not supported |
3 建表
使用DataSource API将自动创建表。
使用Spark SQL需要显式创建表,需要以下参数:
表类型
cow或mor
是否分区
通过是否使用partition by语句判定
是否外部表
通过使用location或create external table指定外部表,否则创建内部表。详见内部表和外部表
注意:
- 从0.10.0开始必须设置主键,包括之前版本创建的表。默认使用uuid作为主键。
- primaryKey、preCombineField和type区分大小写。
- 优先使用tblproperties设置,而不是options。
- Spark SQL创建的表会默认设置hoodie.table.keygenerator.class为org.apache.hudi.keygen.ComplexKeyGenerator,设置hoodie.datasource.write.hive_style_partitioning为true。
1 | -- create a cow table, with default primaryKey 'uuid' and without preCombineField provided |
关键配置
Parameter Name | Default | Introduction |
---|---|---|
primaryKey | uuid | The primary key names of the table, multiple fields separated by commas. Same as hoodie.datasource.write.recordkey.field |
preCombineField | The pre-combine field of the table. Same as hoodie.datasource.write.precombine.field |
|
type | cow | The table type to create. type = ‘cow’ means a COPY-ON-WRITE table, while type = ‘mor’ means a MERGE-ON-READ table. Same as hoodie.datasource.write.table.type |
4 插入数据
- Scala
1 | // spark-shell |
- Spark SQL
1 | -- insert into non-partitioned table |
5 查询数据
(1) Scala
加载数据
1 | // spark-shell |
时间旅行查询(>=0.9.0)
三种时间格式查询
1 | spark.read. |
注意
从版本0.9.0开始支持内建文件索引HoodieFileIndex。可用于分区裁剪和元数据查询。
支持非全局路径?
(2) Spark SQL
1 | select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0 |
6 更新数据
(1) Scala
1 | // spark-shell |
(2) Spark SQL
1) 更新
需要设置preCombineField
1 | -- 语法 |
2) MergeInto
1 | -- 语法 |
7 增量查询
查询从某一时间点起的数据
1 | // spark-shell |
8 Point in time查询
查询某个时间段的数据
1 | // spark-shell |
9 删除数据
Scala
删除操作只支持Append模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// spark-shell
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// run the same read query as above.
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()Spark SQL
1
2
3
4
5
6-- 语法
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
-- 示例
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
10 插入覆盖
相比upsert,更适用于批量作业。因为增量更新,跳过索引、预聚合和再分区步骤。
Scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
// Should have different keys now for San Francisco alone, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)Spark SQL
分区表使用insert overwrite, 非分区表使用insert overwrite table。
1
2
3
4
5
6
7
8
9-- insert overwrite non-partitioned table
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
-- insert overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';
-- insert overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;
11 其他命令
(1) 修改表
1 | -- 语法 |
(2) 分区命令
当前show partitions命令基于文件路径展示,当删除分区或整个分区内数据后将不再准确。
1 | -- 语法 |